package util;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class CDCUtil {


    public static DataStreamSource<String> getMysqlSource(StreamExecutionEnvironment env,String databaseName) {

        Properties properties = new Properties();
        properties.setProperty("decimal.handling.mode", "double");
        properties.setProperty("time.precision.mode", "connect");
        properties.setProperty("useSSL", "false");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("node101")
                .port(3306)
                .debeziumProperties(properties)
                .jdbcProperties(properties)
                .databaseList(databaseName) // 设置捕获的数据库， 如果需要同步整个数据库，请将 tableList 设置为 ".*".
                .startupOptions(StartupOptions.earliest())  // ToDo 默认采集全量
                .tableList(
                        databaseName + ".*"
                ) // 设置捕获的表
                .username("root")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
                .build();
        env.setParallelism(1);
        // 设置 3s 的 checkpoint 间隔
        env.enableCheckpointing(3000);
        //2.2 指定CK 的一致性语义
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次 CK 数据
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定从CK 自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 设置状态后端
//        env.setStateBackend(new FsStateBackend("hdfs://node101:8020/flinkCDC"));
// 2.6 设置访问 HDFS 的用户名 System.setProperty("HADOOP_USER_NAME", "root");

     return env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
    }


}
